جریان - راهنمای قطعی

با استفاده از Streams API، نحوه استفاده از جریان‌های قابل خواندن، قابل نوشتن و تبدیل را بیاموزید.

Streams API به شما این امکان را می‌دهد که به صورت برنامه‌نویسی به جریان‌های داده‌ای که از طریق شبکه دریافت می‌شوند یا با هر وسیله محلی ایجاد شده‌اند دسترسی داشته باشید و آنها را با جاوا اسکریپت پردازش کنید. استریم شامل تجزیه منبعی است که می‌خواهید دریافت کنید، ارسال کنید یا به تکه‌های کوچک تبدیل کنید و سپس این تکه‌ها را ذره ذره پردازش کنید. در حالی که پخش جریانی کاری است که مرورگرها هنگام دریافت دارایی‌هایی مانند HTML یا ویدیوها برای نمایش در صفحات وب انجام می‌دهند، این قابلیت قبل از اینکه fetch با استریم‌ها در سال 2015 معرفی شود، هرگز برای جاوا اسکریپت در دسترس نبوده است.

قبلاً، اگر می‌خواستید نوعی منبع را پردازش کنید (چه ویدیو باشد، چه یک فایل متنی و غیره)، باید کل فایل را دانلود کنید، منتظر بمانید تا به فرمت مناسب تبدیل شود و سپس آن را پردازش کنید. با در دسترس بودن جریان ها برای جاوا اسکریپت، همه اینها تغییر می کند. اکنون می‌توانید داده‌های خام را با جاوا اسکریپت به‌محض اینکه در کلاینت در دسترس قرار گرفت، بدون نیاز به ایجاد بافر، رشته یا حباب، به تدریج پردازش کنید. این کار تعدادی از موارد استفاده را باز می کند که برخی از آنها را در زیر لیست می کنم:

  • جلوه های ویدیویی: لوله گذاری یک جریان ویدیویی قابل خواندن از طریق یک جریان تبدیل که جلوه ها را در زمان واقعی اعمال می کند.
  • فشرده سازی داده ها: لوله گذاری یک جریان فایل از طریق یک جریان تبدیل که به طور انتخابی آن را فشرده می کند.
  • رمزگشایی تصویر: لوله گذاری یک جریان پاسخ HTTP از طریق یک جریان تبدیل که بایت ها را به داده های بیت مپ رمزگشایی می کند، و سپس از طریق جریان تبدیل دیگری که بیت مپ ها را به PNG ترجمه می کند. اگر در داخل fetch یک سرویس‌کار نصب شده باشد، این به شما امکان می‌دهد فرمت‌های تصویر جدید مانند AVIF را به صورت شفاف پر کنید.

پشتیبانی از مرورگر

ReadableStream و WritableStream

Browser Support

  • کروم: 43.
  • لبه: 14.
  • فایرفاکس: 65.
  • سافاری: 10.1.

Source

TransformStream

Browser Support

  • کروم: 67.
  • لبه: 79.
  • فایرفاکس: 102.
  • سافاری: 14.1.

Source

مفاهیم اصلی

قبل از اینکه به جزئیات انواع مختلف جریان ها بپردازم، اجازه دهید برخی از مفاهیم اصلی را معرفی کنم.

تکه ها

تکه تکه ای از داده است که در یک جریان نوشته می شود یا از آن خوانده می شود. می تواند از هر نوع باشد؛ جریان ها حتی می توانند شامل تکه هایی از انواع مختلف باشند. در بیشتر مواقع، یک قطعه اتمی ترین واحد داده برای یک جریان معین نخواهد بود. به عنوان مثال، یک جریان بایت ممکن است شامل تکه هایی از 16 کیلوبایت واحد Uint8Array به جای بایت های تک باشد.

جریان های خواندنی

یک جریان قابل خواندن نشان دهنده منبع داده ای است که می توانید از آن مطالعه کنید. به عبارت دیگر، داده ها از یک جریان قابل خواندن خارج می شوند . به طور مشخص، یک جریان قابل خواندن نمونه ای از کلاس ReadableStream است.

جریان های قابل نوشتن

یک جریان قابل نوشتن نشان دهنده مقصدی برای داده است که می توانید در آن بنویسید. به عبارت دیگر، داده ها به یک جریان قابل نوشتن وارد می شوند . به طور مشخص، یک جریان قابل نوشتن نمونه ای از کلاس WritableStream است.

جریان ها را متحول کنید

یک جریان تبدیل از یک جفت جریان تشکیل شده است: یک جریان قابل نوشتن، که به عنوان سمت قابل نوشتن آن شناخته می شود، و یک جریان قابل خواندن، که به عنوان سمت قابل خواندن آن شناخته می شود. یک استعاره در دنیای واقعی برای این می‌تواند یک مترجم همزمان باشد که از یک زبان به زبان دیگر در حال پرواز ترجمه می‌کند. به روشی خاص برای جریان تبدیل، نوشتن در سمت قابل نوشتن منجر به ارائه داده های جدید برای خواندن از سمت قابل خواندن می شود. بطور مشخص، هر شیئی با خاصیت writable و قابلیت readable می تواند به عنوان یک جریان تبدیل عمل کند. با این حال، کلاس TransformStream استاندارد، ایجاد چنین جفتی را که به درستی در هم پیچیده شده است، آسان تر می کند.

زنجیر لوله

جریان ها عمدتاً با لوله کشی آنها به یکدیگر استفاده می شوند. یک جریان قابل خواندن را می توان با استفاده از روش pipeTo() جریان قابل خواندن، مستقیماً به یک جریان قابل نوشتن لوله کرد، یا می توان آن را ابتدا از طریق یک یا چند جریان تبدیل با استفاده از روش pipeThrough() جریان خواندنی هدایت کرد. مجموعه‌ای از جریان‌ها که به این روش با هم لوله می‌شوند، زنجیره لوله نامیده می‌شوند.

پس فشار

هنگامی که یک زنجیره لوله ساخته می شود، سیگنال هایی در مورد اینکه تکه ها با چه سرعتی باید در آن جریان داشته باشند منتشر می کند. اگر هر مرحله‌ای در زنجیره هنوز نتواند تکه‌ها را بپذیرد، سیگنالی را به سمت عقب از طریق زنجیره لوله منتشر می‌کند تا در نهایت به منبع اصلی گفته شود که تولید تکه‌ها را با این سرعت متوقف کند. این فرآیند عادی سازی جریان، فشار برگشتی نامیده می شود.

تیینگ

یک جریان خوانا را می‌توان با استفاده از روش tee() تِید کرد (نام آن بر اساس شکل حرف T بزرگ). با این کار جریان قفل می شود، یعنی دیگر قابل استفاده مستقیم نیست. با این حال، دو جریان جدید به نام شاخه ایجاد می کند که می تواند به طور مستقل مصرف شود. Teeing همچنین مهم است زیرا جریان ها را نمی توان بازگردانی یا راه اندازی مجدد کرد، در ادامه در مورد این موضوع بیشتر توضیح خواهیم داد.

نمودار یک زنجیره لوله متشکل از یک جریان قابل خواندن که از فراخوانی به API واکشی می‌آید که سپس از طریق یک جریان تبدیل که خروجی آن تید می‌شود، لوله‌گذاری می‌شود و سپس برای اولین جریان قابل خواندن نتیجه به مرورگر و برای جریان قابل خواندن دوم به کش سرویس‌کار فرستاده می‌شود.
یک زنجیر لوله

مکانیک یک جریان خوانا

یک جریان قابل خواندن یک منبع داده است که در جاوا اسکریپت توسط یک شی ReadableStream که از یک منبع زیرین جریان می یابد، نمایش داده می شود. سازنده ReadableStream() یک شی جریان قابل خواندن را از کنترل کننده های داده شده ایجاد و برمی گرداند. دو نوع منبع اساسی وجود دارد:

  • هنگامی که به آنها دسترسی پیدا کردید، منابع فشار دائماً داده ها را به سمت شما می فرستند، و شروع، توقف یا لغو دسترسی به جریان به عهده شماست. به عنوان مثال می توان به جریان های ویدیویی زنده، رویدادهای ارسال شده توسط سرور یا WebSockets اشاره کرد.
  • منابع کششی از شما می‌خواهند که پس از اتصال به آنها صریحاً داده‌هایی را از آنها درخواست کنید. به عنوان مثال می توان به عملیات HTTP از طریق تماس های fetch() یا XMLHttpRequest اشاره کرد.

داده های جریان به صورت متوالی در قطعات کوچک به نام تکه خوانده می شوند. گفته می شود که تکه هایی که در یک جریان قرار می گیرند در صف قرار می گیرند. این بدان معنی است که آنها در یک صف آماده برای خواندن منتظر هستند. یک صف داخلی تکه هایی را که هنوز خوانده نشده اند ردیابی می کند.

یک استراتژی صف شیئی است که تعیین می کند چگونه یک جریان باید بر اساس وضعیت صف داخلی خود سیگنال فشار برگشتی را نشان دهد. استراتژی صف بندی یک اندازه به هر تکه اختصاص می دهد و اندازه کل همه تکه های صف را با یک عدد مشخص مقایسه می کند که به عنوان علامت آب بالا شناخته می شود.

تکه های داخل جریان توسط خواننده خوانده می شود. این خواننده داده ها را یک تکه در یک زمان بازیابی می کند و به شما امکان می دهد هر نوع عملیاتی را که می خواهید روی آن انجام دهید. خواننده به اضافه کد پردازش دیگری که همراه با آن است، مصرف کننده نامیده می شود.

ساختار بعدی در این زمینه کنترل کننده نامیده می شود. هر جریان قابل خواندن دارای یک کنترلر مرتبط است که همانطور که از نام آن پیداست به شما امکان می دهد جریان را کنترل کنید.

فقط یک خواننده می تواند یک جریان را در یک زمان بخواند. هنگامی که یک خواننده ایجاد می شود و شروع به خواندن یک جریان می کند (یعنی خواننده فعال می شود)، روی آن قفل می شود. اگر می‌خواهید خواننده دیگری مسئولیت خواندن جریان شما را بر عهده بگیرد، معمولاً باید قبل از انجام هر کار دیگری، اولین خواننده را آزاد کنید (اگرچه می‌توانید استریم‌ها را تیپ کنید ).

ایجاد یک جریان قابل خواندن

شما با فراخوانی سازنده آن ReadableStream() یک جریان قابل خواندن ایجاد می کنید. سازنده یک آرگومان اختیاری underlyingSource دارد، که یک شی را با متدها و ویژگی هایی نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را مشخص می کند.

منبع underlyingSource

این می تواند از روش های اختیاری و تعریف شده توسط توسعه دهنده زیر استفاده کند:

  • start(controller) : بلافاصله پس از ساخت شیء فراخوانی می شود. این روش می‌تواند به منبع جریان دسترسی داشته باشد، و هر کار دیگری که برای تنظیم عملکرد جریان لازم است انجام دهد. اگر قرار باشد این فرآیند به صورت ناهمزمان انجام شود، روش می تواند یک وعده را به نشانه موفقیت یا شکست برگرداند. پارامتر controller ارسال شده به این روش یک ReadableStreamDefaultController است.
  • pull(controller) : می توان از آن برای کنترل جریان استفاده کرد زیرا تکه های بیشتری واکشی می شوند. تا زمانی که صف داخلی تکه های جریان پر نباشد، به طور مکرر فراخوانی می شود، تا زمانی که صف به علامت آب بالای خود برسد. اگر نتیجه فراخوانی pull() یک وعده باشد، pull() دوباره فراخوانی نخواهد شد تا زمانی که قول گفته شده محقق شود. اگر قول رد شود، جریان دچار خطا می شود.
  • cancel(reason) : هنگامی که مصرف کننده جریان جریان را لغو می کند، تماس می گیرد.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController از روش های زیر پشتیبانی می کند:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

دومین آرگومان سازنده سازنده ReadableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که دو پارامتر دارد:

  • highWaterMark : یک عدد غیر منفی که نشان دهنده علامت بالای آب جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی متناهی مقدار قطعه داده شده را محاسبه و برمی گرداند. نتیجه برای تعیین فشار برگشتی استفاده می‌شود که از طریق ویژگی ReadableStreamDefaultController.desiredSize مناسب آشکار می‌شود. همچنین زمانی که متد pull() منبع زیربنایی فراخوانی شود، حاکم است.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

متدهای getReader() و read()

برای خواندن از یک جریان قابل خواندن، به یک خواننده نیاز دارید که ReadableStreamDefaultReader خواهد بود. متد getReader() رابط ReadableStream یک خواننده ایجاد می کند و جریان را روی آن قفل می کند. در حالی که جریان قفل است، تا زمانی که این یکی منتشر نشود، نمی توان خواننده دیگری را به دست آورد.

متد read() واسط ReadableStreamDefaultReader قولی را برمی‌گرداند که به قسمت بعدی در صف داخلی جریان دسترسی پیدا می‌کند. بسته به وضعیت جریان، نتیجه ای را برآورده یا رد می کند. احتمالات مختلف به شرح زیر است:

  • اگر تکه ای موجود باشد، قول با یک شی از فرم محقق می شود
    { value: chunk, done: false } .
  • اگر جریان بسته شود، قول با یک شی از شکل محقق می شود
    { value: undefined, done: true } .
  • در صورت خطا شدن جریان، قول با خطای مربوطه رد می شود.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

ملک locked

با دسترسی به ویژگی ReadableStream.locked آن، می توانید بررسی کنید که آیا یک جریان قابل خواندن قفل شده است.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل خواندن

نمونه کد زیر تمام مراحل را در عمل نشان می دهد. ابتدا یک ReadableStream ایجاد می کنید که در آرگومان underlyingSource آن (یعنی کلاس TimestampSource ) یک متد start() را تعریف می کند. این متد به controller استریم می گوید که هر ثانیه در طول ده ثانیه یک timestamp enqueue() . در نهایت، به کنترل کننده می گوید که جریان را close() . شما این جریان را با ایجاد یک خواننده از طریق متد getReader() و فراخوانی read() مصرف می کنید تا پخش جریانی done شود.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

تکرار ناهمزمان

بررسی هر تکرار حلقه read() در صورت done استریم ممکن است راحت‌ترین API نباشد. خوشبختانه به زودی راه بهتری برای انجام این کار وجود خواهد داشت: تکرار ناهمزمان.

for await (const chunk of stream) {
  console.log(chunk);
}

امروزه یک راه حل برای استفاده از تکرار ناهمزمان، پیاده سازی رفتار با پلی پر است.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

پخش جریانی خواندنی

متد tee() واسط ReadableStream جریان قابل خواندن فعلی را به تصویر می کشد و یک آرایه دو عنصری حاوی دو شاخه به دست آمده را به عنوان نمونه های ReadableStream جدید برمی گرداند. این به دو خواننده اجازه می دهد تا یک جریان را به طور همزمان بخوانند. برای مثال، اگر می‌خواهید پاسخی را از سرور دریافت کنید و آن را به مرورگر پخش کنید، و همچنین آن را به کش سرویس‌کار نیز پخش کنید، ممکن است این کار را انجام دهید. از آنجایی که یک بدنه پاسخ نمی تواند بیش از یک بار مصرف شود، برای انجام این کار به دو نسخه نیاز دارید. برای لغو جریان، باید هر دو شاخه حاصل را لغو کنید. پخش جریانی معمولاً آن را برای مدت زمان قفل می کند و از قفل کردن آن توسط سایر خوانندگان جلوگیری می کند.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

جریان های بایت قابل خواندن

برای جریان‌هایی که بایت‌ها را نشان می‌دهند، یک نسخه توسعه‌یافته از جریان قابل خواندن برای مدیریت کارآمد بایت‌ها، به ویژه با به حداقل رساندن کپی‌ها، ارائه شده است. جریان‌های بایتی اجازه می‌دهند تا خوانندگان بافر خود را بیاورید (BYOB). اجرای پیش‌فرض می‌تواند طیفی از خروجی‌های مختلف مانند رشته‌ها یا بافرهای آرایه را در مورد WebSockets ارائه دهد، در حالی که جریان‌های بایت خروجی بایت را تضمین می‌کنند. علاوه بر این، خوانندگان BYOB مزایای پایداری دارند. این به این دلیل است که اگر یک بافر جدا شود، می تواند تضمین کند که یک بافر دو بار در یک بافر نوشته نمی شود، بنابراین از شرایط مسابقه اجتناب می شود. خواننده های BYOB می توانند تعداد دفعاتی را که مرورگر برای اجرای جمع آوری زباله نیاز دارد کاهش دهد، زیرا می تواند از بافرها استفاده مجدد کند.

ایجاد یک جریان بایت قابل خواندن

شما می توانید با ارسال یک پارامتر type اضافی به سازنده ReadableStream() یک جریان بایت قابل خواندن ایجاد کنید.

new ReadableStream({ type: 'bytes' });

منبع underlyingSource

منبع اصلی یک جریان بایت قابل خواندن یک ReadableByteStreamController برای دستکاری داده می شود. متد ReadableByteStreamController.enqueue() آن یک آرگومان chunk می گیرد که مقدار آن ArrayBufferView است. ویژگی ReadableByteStreamController.byobRequest درخواست کشش فعلی BYOB را برمی‌گرداند یا اگر وجود نداشته باشد، تهی است. در نهایت، ویژگی ReadableByteStreamController.desiredSize اندازه مورد نظر را برای پر کردن صف داخلی جریان کنترل شده برمی گرداند.

queuingStrategy

دومین آرگومان سازنده سازنده ReadableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که یک پارامتر را می گیرد:

  • highWaterMark : تعداد غیرمنفی بایت‌ها که نشان‌دهنده علامت آبی بالای جریان با استفاده از این استراتژی صف‌بندی است. این برای تعیین فشار برگشتی استفاده می شود که از طریق ویژگی ReadableByteStreamController.desiredSize مناسب نشان داده می شود. همچنین زمانی که متد pull() منبع زیربنایی فراخوانی شود، حاکم است.

متدهای getReader() و read()

سپس می توانید با تنظیم پارامتر mode مطابق با آن، به ReadableStreamBYOBReader دسترسی پیدا کنید: ReadableStream.getReader({ mode: "byob" }) . این امکان کنترل دقیق تری بر تخصیص بافر را فراهم می کند تا از کپی جلوگیری شود. برای خواندن از جریان بایت، باید ReadableStreamBYOBReader.read(view) را فراخوانی کنید، جایی که view یک ArrayBufferView است.

نمونه کد جریان بایت قابل خواندن

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

تابع زیر جریان‌های بایت قابل خواندن را برمی‌گرداند که امکان خواندن کارآمد صفر کپی آرایه‌ای که به‌طور تصادفی تولید می‌شود را فراهم می‌کند. به جای استفاده از اندازه قطعه از پیش تعیین شده 1024، سعی می کند بافر ارائه شده توسط توسعه دهنده را پر کند و امکان کنترل کامل را فراهم می کند.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

مکانیک یک جریان قابل نوشتن

یک جریان قابل نوشتن مقصدی است که می‌توانید داده‌هایی را در آن بنویسید که در جاوا اسکریپت توسط یک شی WritableStream نمایش داده می‌شود. این به عنوان یک انتزاع در بالای یک سینک زیرین عمل می کند - یک ورودی/خروجی سطح پایین تر که داده های خام در آن نوشته می شود.

داده‌ها از طریق یک رایتر ، یک تکه در یک زمان، روی جریان نوشته می‌شوند. یک تکه می تواند اشکال مختلفی داشته باشد، درست مانند تکه های یک خواننده. می توانید از هر کدی که دوست دارید برای تولید تکه های آماده برای نوشتن استفاده کنید. نویسنده به اضافه کد مرتبط تولید کننده نامیده می شود.

هنگامی که یک نویسنده ایجاد می شود و شروع به نوشتن در یک جریان (یک نویسنده فعال ) می کند، گفته می شود که به آن قفل شده است. فقط یک نویسنده می تواند در یک جریان قابل نوشتن در یک زمان بنویسد. اگر می‌خواهید نویسنده دیگری شروع به نوشتن در جریان شما کند، معمولاً باید آن را منتشر کنید، قبل از اینکه نویسنده دیگری را به آن وصل کنید.

یک صف داخلی تکه هایی را که در جریان نوشته شده اند اما هنوز توسط سینک زیرین پردازش نشده اند، پیگیری می کند.

یک استراتژی صف شیئی است که تعیین می کند چگونه یک جریان باید بر اساس وضعیت صف داخلی خود سیگنال فشار برگشتی را نشان دهد. استراتژی صف بندی یک اندازه به هر تکه اختصاص می دهد و اندازه کل همه تکه های صف را با یک عدد مشخص مقایسه می کند که به عنوان علامت آب بالا شناخته می شود.

ساختار نهایی یک کنترل کننده نامیده می شود. هر جریان قابل نوشتن دارای یک کنترلر مرتبط است که به شما امکان می دهد جریان را کنترل کنید (به عنوان مثال، آن را لغو کنید).

ایجاد یک جریان قابل نوشتن

رابط WritableStream از Streams API یک انتزاع استاندارد برای نوشتن داده های جریانی در یک مقصد ارائه می دهد که به عنوان سینک شناخته می شود. این شی با فشار پشتی و صف داخلی همراه است. شما یک جریان قابل نوشتن با فراخوانی سازنده آن WritableStream() ایجاد می کنید. دارای یک پارامتر underlyingSink اختیاری است که یک شی را با روش ها و ویژگی هایی نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را مشخص می کند.

سینک underlyingSink

underlyingSink می‌تواند شامل روش‌های اختیاری و تعریف‌شده توسط توسعه‌دهنده زیر باشد. پارامتر controller ارسال شده به برخی از متدها WritableStreamDefaultController است.

  • start(controller) : این متد بلافاصله پس از ساخت شیء فراخوانی می شود. محتوای این روش باید به منظور دسترسی به سینک زیرین باشد. اگر قرار باشد این فرآیند به صورت ناهمزمان انجام شود، می تواند یک وعده را به نشانه موفقیت یا شکست برگرداند.
  • write(chunk, controller) : این روش زمانی فراخوانی می شود که یک قطعه جدید از داده (که در پارامتر chunk مشخص شده است) برای نوشتن در سینک زیرین آماده باشد. می تواند یک وعده را به سیگنال موفقیت یا شکست عملیات نوشتن بازگرداند. این روش تنها پس از موفقیت در نوشتن قبلی فراخوانی می شود، و هرگز پس از بسته شدن یا قطع شدن جریان.
  • close(controller) : این روش در صورتی فراخوانی می شود که برنامه سیگنال دهد که نوشتن تکه ها در جریان به پایان رسیده است. محتویات باید هر کاری را که لازم است برای نهایی کردن نوشته‌ها در سینک زیرین و آزاد کردن دسترسی به آن انجام دهند. اگر این فرآیند ناهمزمان باشد، می تواند یک وعده را به نشانه موفقیت یا شکست بازگرداند. این متد تنها پس از موفقیت آمیز بودن تمام نوشتن های در صف فراخوانی می شود.
  • abort(reason) : این روش در صورتی فراخوانی می شود که برنامه سیگنال دهد که می خواهد به طور ناگهانی جریان را ببندد و آن را در حالت خطا قرار دهد. می تواند هر منبع نگهداری شده را پاک کند، مانند close() ، اما abort() حتی اگر نوشته ها در صف قرار گیرند فراخوانی می شود. آن تکه ها دور ریخته خواهند شد. اگر این فرآیند ناهمزمان باشد، می تواند یک وعده را به نشانه موفقیت یا شکست بازگرداند. پارامتر reason حاوی یک DOMString است که علت قطع جریان را توضیح می دهد.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

رابط WritableStreamDefaultController در Streams API کنترل‌کننده‌ای را نشان می‌دهد که امکان کنترل وضعیت WritableStream را در حین راه‌اندازی می‌دهد، زیرا تکه‌های بیشتری برای نوشتن ارسال می‌شوند، یا در پایان نوشتن. هنگام ساخت WritableStream ، به سینک زیرین یک نمونه WritableStreamDefaultController مربوطه برای دستکاری داده می شود. WritableStreamDefaultController تنها یک روش دارد: WritableStreamDefaultController.error() که هر گونه تعاملات آتی با جریان مرتبط را با خطا مواجه می کند. WritableStreamDefaultController همچنین از ویژگی signal پشتیبانی می‌کند که نمونه‌ای از AbortSignal را برمی‌گرداند و اجازه می‌دهد عملیات WritableStream در صورت نیاز متوقف شود.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

آرگومان دوم سازنده WritableStream() نیز اختیاری است queuingStrategy . این یک شی است که به صورت اختیاری یک استراتژی صف برای جریان تعریف می کند که دو پارامتر دارد:

  • highWaterMark : یک عدد غیر منفی که نشان دهنده علامت بالای آب جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی متناهی مقدار قطعه داده شده را محاسبه و برمی گرداند. از نتیجه برای تعیین فشار برگشتی استفاده می‌شود که از طریق ویژگی WritableStreamDefaultWriter.desiredSize مناسب آشکار می‌شود.

متدهای getWriter() و write()

برای نوشتن در یک جریان قابل نوشتن، به یک نویسنده نیاز دارید که WritableStreamDefaultWriter باشد. متد getWriter() رابط WritableStream یک نمونه جدید از WritableStreamDefaultWriter را برمی گرداند و جریان را روی آن نمونه قفل می کند. در حالی که جریان قفل است، تا زمانی که نسخه فعلی منتشر نشود، نمی توان نویسنده دیگری را به دست آورد.

متد write() واسط WritableStreamDefaultWriter یک تکه داده ارسال شده را به WritableStream و سینک زیرین آن می نویسد، سپس یک وعده را برمی گرداند که موفقیت یا شکست عملیات نوشتن را نشان می دهد. توجه داشته باشید که "موفقیت" به معنای آن است. ممکن است نشان دهد که این قطعه پذیرفته شده است، و نه لزوماً اینکه به طور ایمن به مقصد نهایی خود ذخیره شده است.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

ملک locked

با دسترسی به ویژگی WritableStream.locked آن، می‌توانید بررسی کنید که آیا یک جریان قابل نوشتن قفل شده است.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل نوشتن

نمونه کد زیر تمام مراحل را در عمل نشان می دهد.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

انتقال یک جریان قابل خواندن به یک جریان قابل نوشتن

یک جریان قابل خواندن را می توان از طریق روش pipeTo() جریان قابل خواندن به یک جریان قابل نوشتن لوله کرد. ReadableStream.pipeTo() ReadableStream کنونی را به یک WritableStream داده شده لوله می کند و وعده ای را برمی گرداند که زمانی که فرآیند لوله کشی با موفقیت کامل شد محقق می شود، یا در صورت بروز هرگونه خطا، آن را رد می کند.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

ایجاد یک جریان تبدیل

رابط TransformStream از Streams API مجموعه ای از داده های قابل تبدیل را نشان می دهد. شما یک جریان تبدیل را با فراخوانی سازنده آن TransformStream() ایجاد می کنید که یک شی جریان تبدیل را از کنترل کننده های داده شده ایجاد و برمی گرداند. سازنده TransformStream() به عنوان اولین آرگومان خود یک شی جاوا اسکریپت اختیاری که transformer نشان می دهد می پذیرد. چنین اشیایی می توانند شامل یکی از روش های زیر باشند:

transformer

  • start(controller) : این متد بلافاصله پس از ساخت شیء فراخوانی می شود. معمولاً از این برای قرار دادن تکه‌های پیشوند با استفاده از controller.enqueue() استفاده می‌شود. این تکه ها از سمت قابل خواندن خوانده می شوند اما به هیچ نوشته ای در سمت قابل نوشتن بستگی ندارند. اگر این فرآیند اولیه ناهمزمان باشد، برای مثال به این دلیل که برای بدست آوردن تکه‌های پیشوند مقداری تلاش لازم است، تابع می‌تواند یک وعده را به سیگنال موفقیت یا شکست برگرداند. یک وعده رد شده جریان را با خطا مواجه می کند. هر استثنای پرتاب شده توسط سازنده TransformStream() دوباره پرتاب می شود.
  • transform(chunk, controller) : این روش زمانی فراخوانی می شود که یک قطعه جدید که در ابتدا در سمت قابل نوشتن نوشته شده بود آماده تبدیل شود. اجرای جریان تضمین می‌کند که این تابع تنها پس از موفقیت در تبدیل‌های قبلی فراخوانی می‌شود، و هرگز قبل از اتمام start() یا بعد از flush() فراخوانی نمی‌شود. این تابع کار تبدیل واقعی جریان تبدیل را انجام می دهد. می تواند نتایج را با استفاده از controller.enqueue() در صف قرار دهد. این اجازه می‌دهد که یک تکه تکه نوشته شده در سمت قابل نوشتن، بسته به تعداد دفعاتی که controller.enqueue() فراخوانی شود، به صفر یا چند تکه در سمت قابل خواندن منجر شود. اگر فرآیند تبدیل ناهمزمان باشد، این تابع می تواند یک وعده را به سیگنال موفقیت یا شکست تبدیل بازگرداند. یک وعده رد شده هر دو طرف قابل خواندن و نوشتن جریان تبدیل را دچار خطا می کند. اگر متد transform() ارائه نشده باشد، تبدیل هویت استفاده می‌شود که تکه‌های بدون تغییر از سمت قابل نوشتن به سمت قابل خواندن را در صف قرار می‌دهد.
  • flush(controller) : این متد پس از اینکه تمام تکه های نوشته شده در سمت قابل نوشتن با عبور موفقیت آمیز از transform() تبدیل شده اند و سمت قابل نوشتن در شرف بسته شدن است فراخوانی می شود. معمولاً از این برای قرار دادن تکه‌های پسوند در سمت قابل خواندن، قبل از بسته شدن استفاده می‌شود. اگر فرآیند فلاشینگ ناهمزمان باشد، تابع می تواند یک وعده را به سیگنال موفقیت یا شکست برگرداند. نتیجه به تماس گیرنده stream.writable.write() اطلاع رسانی خواهد شد. علاوه بر این، یک وعده رد شده هر دو طرف قابل خواندن و نوشتن جریان را با خطا مواجه می کند. پرتاب یک استثنا مانند بازگرداندن قول رد شده تلقی می شود.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتژی های writableStrategy و readableStrategy صف بندی

دومین و سومین پارامتر اختیاری سازنده TransformStream() عبارتند از استراتژی های writableStrategy اختیاری و صف بندی readableStrategy . آنها به ترتیب در بخش‌های جریان خواندنی و قابل نوشتن مشخص شده‌اند.

تبدیل نمونه کد جریان

نمونه کد زیر یک جریان تبدیل ساده را در عمل نشان می دهد.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

لوله گذاری یک جریان قابل خواندن از طریق یک جریان تبدیل

متد pipeThrough() رابط ReadableStream یک روش زنجیره‌ای برای لوله‌کشی جریان جاری از طریق یک جریان تبدیل یا هر جفت قابل نوشتن/خواندن دیگری ارائه می‌کند. لوله کشی یک جریان عموماً آن را در طول مدت لوله قفل می کند و از قفل شدن آن توسط سایر خوانندگان جلوگیری می کند.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

نمونه کد بعدی (کمی ساختگی) نشان می‌دهد که چگونه می‌توانید یک نسخه «فریاد» از fetch() را پیاده‌سازی کنید که تمام متن را با استفاده از قول پاسخ برگشتی به‌عنوان یک جریان و بزرگ‌کردن تکه به تکه، بزرگ‌نمایی می‌کند. مزیت این روش این است که شما نیازی به صبر کردن برای دانلود کل سند ندارید، که می تواند تفاوت بزرگی در هنگام کار با فایل های بزرگ ایجاد کند.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

نسخه ی نمایشی

دمو زیر جریان های قابل خواندن، قابل نوشتن و تبدیل را در عمل نشان می دهد. همچنین شامل نمونه‌هایی از زنجیره‌های لوله pipeThrough() و pipeTo() می‌شود و همچنین tee() را نشان می‌دهد. شما می توانید به صورت اختیاری نسخه ی نمایشی را در پنجره خود اجرا کنید یا کد منبع را مشاهده کنید.

جریان های مفید موجود در مرورگر

تعدادی جریان مفید مستقیماً در مرورگر تعبیه شده است. شما به راحتی می توانید یک ReadableStream از یک حباب ایجاد کنید. متد stream() واسط Blob یک ReadableStream برمی گرداند که پس از خواندن داده های موجود در blob را برمی گرداند. همچنین به یاد داشته باشید که یک شی File نوع خاصی از Blob است و می تواند در هر زمینه ای که یک blob می تواند استفاده شود.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

انواع جریان TextDecoder.decode() و TextEncoder.encode() به ترتیب TextDecoderStream و TextEncoderStream نامیده می شوند.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

فشرده سازی یا از حالت فشرده خارج کردن یک فایل به ترتیب با جریان های تبدیل CompressionStream و DecompressionStream آسان است. نمونه کد زیر نشان می دهد که چگونه می توانید مشخصات Streams را دانلود کنید، آن را مستقیماً در مرورگر فشرده (gzip) کنید و فایل فشرده شده را مستقیماً روی دیسک بنویسید.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream API دسترسی به فایل سیستم و جریان های درخواست fetch() نمونه هایی از جریان های قابل نوشتن در طبیعت هستند.

Serial API از هر دو جریان قابل خواندن و نوشتن استفاده زیادی می کند.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

در نهایت، WebSocketStream API جریان ها را با WebSocket API یکپارچه می کند.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

منابع مفید

قدردانی ها

این مقاله توسط جیک آرچیبالد ، فرانسوا بوفور ، سام داتون ، ماتیاس بوئلنس ، سورما ، جو مدلی و آدام رایس بررسی شده است. پست های وبلاگ جیک آرچیبالد به من در درک جریان ها کمک زیادی کرده است. برخی از نمونه‌های کد از کاوش‌های کاربر GitHub @bellbind الهام گرفته شده‌اند و بخش‌هایی از نثر به شدت بر روی اسناد وب MDN در Streams ساخته شده‌اند. نویسندگان Streams Standard کار فوق العاده ای در نوشتن این مشخصات انجام داده اند. تصویر قهرمان توسط رایان لارا در Unsplash .

،

با استفاده از Streams API، نحوه استفاده از جریان‌های قابل خواندن، قابل نوشتن و تبدیل را بیاموزید.

Streams API به شما این امکان را می‌دهد که به صورت برنامه‌نویسی به جریان‌های داده‌ای که از طریق شبکه دریافت می‌شوند یا با هر وسیله محلی ایجاد شده‌اند دسترسی داشته باشید و آنها را با جاوا اسکریپت پردازش کنید. استریم شامل تجزیه منبعی است که می‌خواهید دریافت کنید، ارسال کنید یا به تکه‌های کوچک تبدیل کنید و سپس این تکه‌ها را ذره ذره پردازش کنید. در حالی که پخش جریانی کاری است که مرورگرها هنگام دریافت دارایی‌هایی مانند HTML یا ویدیوها برای نمایش در صفحات وب انجام می‌دهند، این قابلیت قبل از اینکه fetch با استریم‌ها در سال 2015 معرفی شود، هرگز برای جاوا اسکریپت در دسترس نبوده است.

قبلاً، اگر می‌خواستید نوعی منبع را پردازش کنید (چه ویدیو باشد، چه یک فایل متنی و غیره)، باید کل فایل را دانلود کنید، منتظر بمانید تا به فرمت مناسب تبدیل شود و سپس آن را پردازش کنید. با در دسترس بودن جریان ها برای جاوا اسکریپت، همه اینها تغییر می کند. اکنون می‌توانید داده‌های خام را با جاوا اسکریپت به‌محض اینکه در کلاینت در دسترس قرار گرفت، بدون نیاز به ایجاد بافر، رشته یا حباب، به تدریج پردازش کنید. این کار تعدادی از موارد استفاده را باز می کند که برخی از آنها را در زیر لیست می کنم:

  • جلوه های ویدیویی: لوله گذاری یک جریان ویدیویی قابل خواندن از طریق یک جریان تبدیل که جلوه ها را در زمان واقعی اعمال می کند.
  • فشرده سازی داده ها: لوله گذاری یک جریان فایل از طریق یک جریان تبدیل که به طور انتخابی آن را فشرده می کند.
  • رمزگشایی تصویر: لوله گذاری یک جریان پاسخ HTTP از طریق یک جریان تبدیل که بایت ها را به داده های بیت مپ رمزگشایی می کند، و سپس از طریق جریان تبدیل دیگری که بیت مپ ها را به PNG ترجمه می کند. اگر در داخل fetch یک سرویس‌کار نصب شده باشد، این به شما امکان می‌دهد فرمت‌های تصویر جدید مانند AVIF را به صورت شفاف پر کنید.

پشتیبانی از مرورگر

ReadableStream و WritableStream

Browser Support

  • کروم: 43.
  • لبه: 14.
  • فایرفاکس: 65.
  • سافاری: 10.1.

Source

TransformStream

Browser Support

  • کروم: 67.
  • لبه: 79.
  • فایرفاکس: 102.
  • سافاری: 14.1.

Source

مفاهیم اصلی

قبل از اینکه به جزئیات انواع مختلف جریان ها بپردازم، اجازه دهید برخی از مفاهیم اصلی را معرفی کنم.

تکه ها

تکه تکه ای از داده است که در یک جریان نوشته می شود یا از آن خوانده می شود. می تواند از هر نوع باشد؛ جریان ها حتی می توانند شامل تکه هایی از انواع مختلف باشند. در بیشتر مواقع، یک قطعه اتمی ترین واحد داده برای یک جریان معین نخواهد بود. به عنوان مثال، یک جریان بایت ممکن است شامل تکه هایی از 16 کیلوبایت واحد Uint8Array به جای بایت های تک باشد.

جریان های خواندنی

یک جریان قابل خواندن نشان دهنده منبع داده ای است که می توانید از آن مطالعه کنید. به عبارت دیگر، داده ها از یک جریان قابل خواندن خارج می شوند . به طور مشخص، یک جریان قابل خواندن نمونه ای از کلاس ReadableStream است.

جریان های قابل نوشتن

یک جریان قابل نوشتن نشان دهنده مقصدی برای داده است که می توانید در آن بنویسید. به عبارت دیگر، داده ها به یک جریان قابل نوشتن وارد می شوند . به طور مشخص، یک جریان قابل نوشتن نمونه ای از کلاس WritableStream است.

جریان ها را متحول کنید

یک جریان تبدیل از یک جفت جریان تشکیل شده است: یک جریان قابل نوشتن، که به عنوان سمت قابل نوشتن آن شناخته می شود، و یک جریان قابل خواندن، که به عنوان سمت قابل خواندن آن شناخته می شود. یک استعاره در دنیای واقعی برای این می‌تواند یک مترجم همزمان باشد که از یک زبان به زبان دیگر در حال پرواز ترجمه می‌کند. به روشی خاص برای جریان تبدیل، نوشتن در سمت قابل نوشتن منجر به ارائه داده های جدید برای خواندن از سمت قابل خواندن می شود. بطور مشخص، هر شیئی با خاصیت writable و قابلیت readable می تواند به عنوان یک جریان تبدیل عمل کند. با این حال، کلاس TransformStream استاندارد، ایجاد چنین جفتی را که به درستی در هم پیچیده شده است، آسان تر می کند.

زنجیر لوله

جریان ها عمدتاً با لوله کشی آنها به یکدیگر استفاده می شوند. یک جریان قابل خواندن را می توان با استفاده از روش pipeTo() جریان قابل خواندن، مستقیماً به یک جریان قابل نوشتن لوله کرد، یا می توان آن را ابتدا از طریق یک یا چند جریان تبدیل با استفاده از روش pipeThrough() جریان خواندنی هدایت کرد. مجموعه ای از جریانهای لوله کشی شده از این طریق به عنوان زنجیره لوله ای گفته می شود.

فشار

پس از ساخت زنجیره لوله ، سیگنالهایی را در مورد چگونگی جریان سریع قطعات از طریق آن پخش می کند. اگر هر قدم در زنجیره هنوز نتواند تکه ها را بپذیرد ، سیگنال را به سمت عقب از طریق زنجیره لوله پخش می کند ، تا اینکه در نهایت به منبع اصلی گفته می شود که تولید سریع قطعات را متوقف کند. این فرآیند عادی سازی جریان ، فشار خون نامیده می شود.

توری

با استفاده از روش tee() آن می توان یک جریان قابل خواندن را (نامگذاری شده به شکل یک حروف بزرگ)) انجام داد. این جریان را قفل می کند ، یعنی دیگر آن را مستقیماً قابل استفاده نمی کند. با این حال ، این دو جریان جدید به نام شاخه هایی ایجاد می کند که می توانند به طور مستقل مصرف شوند. Teeing همچنین مهم است زیرا جریان ها نمی توانند دوباره مجدداً جمع شوند یا دوباره شروع به کار کنند ، بیشتر در مورد این موضوع بعداً.

نمودار یک زنجیره لوله ای متشکل از یک جریان قابل خواندن که از یک تماس به API Fetch است که از طریق یک جریان تبدیل می شود که خروجی آن از طریق آن لوله کشی شده است و سپس برای اولین جریان قابل خواندن به مرورگر ارسال می شود و به حافظه نهان سرویس دهنده برای دوم جریان قابل خواندن است.
یک زنجیره لوله

مکانیک یک جریان قابل خواندن

یک جریان قابل خواندن یک منبع داده است که توسط یک شیء ReadableStream در JavaScript نشان داده شده است که از یک منبع اساسی جریان دارد. سازنده ReadableStream() یک شیء جریان قابل خواندن را از دستگیرهای داده شده ایجاد و برمی گرداند. دو نوع منبع اساسی وجود دارد:

  • منابع را فشار دهید که هنگام دسترسی به آنها ، داده ها را به طور مداوم به سمت شما فشار می دهد و شروع ، مکث یا لغو دسترسی به جریان بر عهده شماست. مثالها شامل جریان های ویدئویی زنده ، رویدادهای سرور یا وب سایت ها است.
  • منابع کشش شما را ملزم به درخواست صریح داده ها از آنها یک بار وصل می کنند. مثالها شامل عملیات HTTP از طریق تماس های fetch() یا XMLHttpRequest است.

داده های جریان به صورت متوالی در قطعات کوچک به نام تکه ها خوانده می شوند. گفته می شود که تکه های قرار داده شده در یک جریان مورد استفاده قرار می گیرند. این بدان معناست که آنها در صف منتظر خواندن هستند. یک صف داخلی قطعاتی را که هنوز خوانده نشده است ، ردیابی می کند.

استراتژی صف بندی شیئی است که تعیین می کند که چگونه یک جریان باید فشار را بر اساس وضعیت صف داخلی خود نشان دهد. استراتژی صف بندی اندازه را به هر قطعه اختصاص می دهد و اندازه کل تمام تکه های موجود در صف را با یک عدد مشخص ، معروف به علامت آب بالا مقایسه می کند.

تکه های داخل جریان توسط یک خواننده خوانده می شود. این خواننده داده ها را به طور همزمان بازیابی می کند و به شما امکان می دهد هر نوع عملیاتی را که می خواهید روی آن انجام دهید انجام دهید. خواننده به علاوه کد پردازش دیگر که همراه آن است ، مصرف کننده نامیده می شود.

ساختار بعدی در این زمینه کنترل کننده نامیده می شود. هر جریان قابل خواندن دارای یک کنترلر مرتبط است که همانطور که از نام آن پیداست ، به شما امکان می دهد جریان را کنترل کنید.

فقط یک خواننده می تواند یک جریان را همزمان بخواند. هنگامی که یک خواننده ایجاد می شود و شروع به خواندن یک جریان می کند (یعنی خواننده فعال می شود) ، به آن قفل می شود. اگر می خواهید یک خواننده دیگر خواندن جریان خود را به عهده بگیرد ، معمولاً قبل از انجام هر کار دیگری باید اولین خواننده را آزاد کنید (اگرچه می توانید جریان ها را انجام دهید ).

ایجاد یک جریان قابل خواندن

شما با فراخوانی سازنده آن ReadableStream() یک جریان قابل خواندن ایجاد می کنید. سازنده دارای یک آرگومان اختیاری است underlyingSource ، که یک شیء با روش ها و خصوصیاتی را نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را تعریف می کند.

زمینه underlyingSource

این می تواند از روشهای اختیاری تعریف شده و تعریف شده زیر استفاده کند:

  • start(controller) : بلافاصله هنگام ساخت شیء نامیده می شود. این روش می تواند به منبع جریان دسترسی داشته باشد و هر کار دیگری را برای تنظیم عملکرد جریان انجام دهد. اگر این فرآیند به صورت ناهمزمان انجام شود ، این روش می تواند نوید را برای سیگنال موفقیت یا عدم موفقیت بازگرداند. پارامتر controller منتقل شده به این روش ، ReadableStreamDefaultController است.
  • pull(controller) : می تواند برای کنترل جریان استفاده شود زیرا تکه های بیشتری به دست می آیند. تا زمانی که صف داخلی قطعات جریان پر نباشد ، به طور مکرر خوانده می شود ، تا زمانی که صف به علامت آب زیاد خود نرسد. اگر نتیجه فراخوانی pull() یک وعده باشد ، تا زمانی که گفته شود که وعده داده می شود ، دوباره به pull() فراخوانده نمی شود. اگر قول رد شود ، جریان اشتباه می شود.
  • cancel(reason) : وقتی مصرف کننده جریان جریان را لغو می کند ، فراخوانی می شود.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController از روشهای زیر پشتیبانی می کند:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

بحث دوم ، به همین ترتیب اختیاری ، سازنده ReadableStream() queuingStrategy است. این شیء است که به طور اختیاری یک استراتژی صف را برای جریان تعریف می کند ، که دو پارامتر را می گیرد:

  • highWaterMark : یک عدد غیر منفی که نشانگر آب زیاد جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی محدود مقدار قطعه داده شده را محاسبه و باز می گرداند. نتیجه برای تعیین فشار خون ، از طریق خاصیت مناسب ReadableStreamDefaultController.desiredSize استفاده می شود. همچنین با استفاده از روش pull() حاکم است.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

روشهای getReader() و read()

برای خواندن از یک جریان قابل خواندن ، به یک خواننده نیاز دارید که یک ReadableStreamDefaultReader خواهد بود. روش getReader() رابط ReadableStream یک خواننده را ایجاد می کند و جریان را به آن قفل می کند. در حالی که جریان قفل شده است ، تا زمانی که این نسخه آزاد نشود ، هیچ خواننده دیگری قابل دستیابی نیست.

روش read() رابط ReadableStreamDefaultReader وعده ای را برای دسترسی به بخش بعدی در صف داخلی جریان باز می گرداند. بسته به وضعیت جریان ، نتیجه آن را برآورده یا رد می کند. امکانات مختلف به شرح زیر است:

  • اگر یک تکه در دسترس باشد ، قول با یک شیء فرم برآورده می شود
    { value: chunk, done: false } .
  • اگر جریان بسته شود ، قول با یک شی از فرم برآورده می شود
    { value: undefined, done: true } .
  • اگر جریان اشتباه شود ، قول با خطای مربوطه رد می شود.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

خاصیت locked

می توانید با دسترسی به ویژگی ReadableStream.locked ، یک جریان قابل خواندن قفل شده باشد.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل خواندن

نمونه کد زیر تمام مراحل عمل را نشان می دهد. شما ابتدا یک ReadableStream ایجاد می کنید که در استدلال underlyingSource خود (یعنی کلاس TimestampSource ) یک روش start() را تعریف می کند. این روش به controller جریان می گوید که هر ثانیه در طی ده ثانیه یک زمان سنج را enqueue() . سرانجام ، به کنترلر می گوید که جریان را close() . شما این جریان را با ایجاد یک خواننده از طریق روش getReader() و فراخوانی read() مصرف می کنید تا جریان done شود.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

تکرار ناهمزمان

اگر جریان done شود ، هر یک از تکرار حلقه read() را بررسی کنید ممکن است راحت ترین API نباشد. خوشبختانه به زودی راه بهتری برای انجام این کار وجود خواهد داشت: تکرار ناهمزمان.

for await (const chunk of stream) {
  console.log(chunk);
}

راه حل برای استفاده از تکرار ناهمزمان امروز ، اجرای رفتار با یک polyfill است.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

یک جریان قابل خواندن

روش tee() رابط ReadableStream جریان قابل خواندن فعلی را به خود اختصاص می دهد ، و یک آرایه دو عنصر حاوی دو شاخه حاصل را به عنوان نمونه های جدید ReadableStream باز می گرداند. این به دو خواننده اجازه می دهد تا یک جریان را همزمان بخوانند. اگر می خواهید پاسخی از سرور داشته باشید و آن را به مرورگر منتقل کنید ، ممکن است این کار را انجام دهید ، به عنوان مثال در یک کارگر سرویس. از آنجا که بدن پاسخ بیش از یک بار قابل مصرف نیست ، برای انجام این کار به دو نسخه نیاز دارید. برای لغو جریان ، باید هر دو شاخه حاصل را لغو کنید. با استفاده از یک جریان ، به طور کلی آن را برای مدت زمان قفل می کند و مانع از قفل شدن سایر خوانندگان آن می شود.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

جریان های بایت قابل خواندن

برای جریانهای نمایندگی بایت ها ، نسخه گسترده ای از جریان قابل خواندن برای کنترل کارآمد بایت ها ، به ویژه با به حداقل رساندن نسخه ها ارائه شده است. جریان های بایت اجازه می دهد تا خوانندگان بافر (BYOB) خود را به دست آورند. اجرای پیش فرض می تواند طیف وسیعی از خروجی های مختلف مانند رشته ها یا بافرهای آرایه را در مورد وب سایت ها ارائه دهد ، در حالی که جریان های بایت خروجی بایت را تضمین می کنند. علاوه بر این ، خوانندگان BYOB مزایای ثبات دارند. این امر به این دلیل است که اگر یک بافر جدا شود ، می تواند تضمین کند که شخص دو بار در همان بافر ننویسد ، از این رو از شرایط مسابقه جلوگیری می کند. خوانندگان BYOB می توانند تعداد دفعاتی را که مرورگر برای اجرای جمع آوری زباله ها نیاز دارد ، کاهش دهد ، زیرا می تواند از بافرها استفاده مجدد کند.

ایجاد یک جریان بایت قابل خواندن

با انتقال یک پارامتر type اضافی به سازنده ReadableStream() می توانید یک جریان بایت قابل خواندن ایجاد کنید.

new ReadableStream({ type: 'bytes' });

زمینه underlyingSource

منبع اصلی یک جریان بایت قابل خواندن برای دستکاری به یک ReadableByteStreamController داده می شود. روش ReadableByteStreamController.enqueue() یک آرگومان chunk را می گیرد که ارزش آن یک ArrayBufferView است. این ملک ReadableByteStreamController.byobRequest درخواست فعلی BYOB را باز می گرداند ، یا در صورت وجود هیچ کدام تهی نمی کند. سرانجام ، ویژگی ReadableByteStreamController.desiredSize اندازه مورد نظر را برای پر کردن صف داخلی جریان کنترل شده برمی گرداند.

queuingStrategy

بحث دوم ، به همین ترتیب اختیاری ، سازنده ReadableStream() queuingStrategy است. این شیء است که به طور اختیاری یک استراتژی صف را برای جریان تعریف می کند ، که یک پارامتر را می گیرد:

  • highWaterMark : تعداد بایت های غیر منفی که نشان دهنده علامت آب زیاد جریان با استفاده از این استراتژی صف است. این برای تعیین فشار خون ، از طریق خاصیت مناسب ReadableByteStreamController.desiredSize استفاده می شود. همچنین با استفاده از روش pull() حاکم است.

روشهای getReader() و read()

سپس می توانید با تنظیم پارامتر mode بر این اساس ، به ReadableStreamBYOBReader دسترسی پیدا کنید: ReadableStream.getReader({ mode: "byob" }) . این امر به منظور جلوگیری از نسخه ها ، کنترل دقیق تری بر تخصیص بافر فراهم می کند. برای خواندن از جریان بایت ، باید با ReadableStreamBYOBReader.read(view) تماس بگیرید ، که در آن view یک ArrayBufferView است.

نمونه کد جریان بایت قابل خواندن

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

عملکرد زیر جریان های بایت قابل خواندن را باز می گرداند که امکان خواندن کارآمد صفر از یک آرایه تولید شده به طور تصادفی را فراهم می کند. به جای استفاده از اندازه قطعه از پیش تعیین شده 1،024 ، سعی می کند بافر تهیه شده از توسعه دهنده را پر کند و امکان کنترل کامل را فراهم کند.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

مکانیک یک جریان قابل نوشتن

جریان قابل نوشتن مقصدی است که می توانید داده ها را بنویسید ، که در JavaScript توسط یک شیء WritableStream نشان داده شده است. این به عنوان انتزاعی در بالای یک سینک زیرزمینی -سینک I/O سطح پایین که داده های خام در آن نوشته شده است ، عمل می کند.

داده ها از طریق یک نویسنده ، یک تکه در یک زمان به جریان نوشته می شوند. یک تکه می تواند اشکال زیادی را به دست آورد ، دقیقاً مانند تکه های یک خواننده. شما می توانید از هر کد مورد نظر برای تولید تکه های آماده برای نوشتن استفاده کنید. نویسنده به علاوه کد مرتبط با آن تولید کننده نامیده می شود.

هنگامی که یک نویسنده ایجاد می شود و شروع به نوشتن به یک جریان (یک نویسنده فعال ) می کند ، گفته می شود که به آن قفل شده است. فقط یک نویسنده می تواند یک زمان به یک جریان قابل نوشتن بنویسد. اگر می خواهید یک نویسنده دیگر شروع به نوشتن در جریان خود کند ، معمولاً باید آن را آزاد کنید ، قبل از آنکه نویسنده دیگری را به آن وصل کنید.

یک صف داخلی قطعاتی را که به جریان نوشته شده اند ، ردیابی می کند اما هنوز توسط سینک زیرین پردازش نشده است.

استراتژی صف بندی شیئی است که تعیین می کند که چگونه یک جریان باید فشار را بر اساس وضعیت صف داخلی خود نشان دهد. استراتژی صف بندی اندازه را به هر قطعه اختصاص می دهد و اندازه کل تمام تکه های موجود در صف را با یک عدد مشخص ، معروف به علامت آب بالا مقایسه می کند.

ساختار نهایی کنترل کننده نامیده می شود. هر جریان قابل نوشتن دارای یک کنترلر مرتبط است که به شما امکان می دهد جریان را کنترل کنید (به عنوان مثال ، آن را سقط کنید).

ایجاد یک جریان قابل نوشتن

رابط WritableStream از API جریان ها یک انتزاع استاندارد برای نوشتن داده های جریان به یک مقصد ، معروف به سینک ، فراهم می کند. این شی با فشار داخلی و صف بندی داخلی همراه است. شما با فراخوانی سازنده آن WritableStream() یک جریان قابل نوشتن ایجاد می کنید. این یک پارامتر underlyingSink اختیاری دارد ، که یک شیء با روش ها و خصوصیاتی را نشان می دهد که نحوه رفتار نمونه جریان ساخته شده را تعریف می کند.

underlyingSink

underlyingSink می تواند روشهای اختیاری و تعریف شده زیر را شامل شود. پارامتر controller که به برخی از روشها منتقل می شود ، یک WritableStreamDefaultController است.

  • start(controller) : این روش بلافاصله هنگام ساخت شیء نامیده می شود. محتوای این روش باید با هدف دسترسی به سینک زیرین هدف قرار گیرد. اگر این فرآیند به صورت ناهمزمان انجام شود ، می تواند وعده ای برای نشان دادن موفقیت یا عدم موفقیت را برگرداند.
  • write(chunk, controller) : این روش هنگامی فراخوانی می شود که یک تکه جدید از داده (که در پارامتر chunk مشخص شده است) آماده است تا روی سینک زیرین نوشته شود. این می تواند وعده ای برای نشان دادن موفقیت یا عدم موفقیت در عملکرد نوشتن را برگرداند. این روش فقط پس از موفقیت نوشته های قبلی فراخوانی خواهد شد ، و هرگز پس از بسته شدن جریان یا سقط جنین.
  • close(controller) : اگر برنامه سیگنال کند که نوشتن تکه ها به جریان را تمام کرده است ، این روش نامیده می شود. محتویات باید هر کاری را که لازم است برای نهایی کردن نوشتن به سینک زیرین انجام دهد و دسترسی به آن را آزاد کند. اگر این فرآیند ناهمزمان باشد ، می تواند وعده ای برای سیگنال موفقیت یا عدم موفقیت را برگرداند. این روش فقط پس از موفقیت تمام صف های صف ، نامیده می شود.
  • abort(reason) : اگر برنامه سیگنال دهد که مایل است به طور ناگهانی جریان را ببندد و آن را در حالت اشتباه قرار دهد ، این روش فراخوانی خواهد شد. این می تواند هر منابع نگهدارنده را مانند close() پاک کند ، اما abort() حتی اگر بنویسد در صف قرار می گیرد ، نامیده می شود. آن تکه ها دور می شوند. اگر این فرآیند ناهمزمان باشد ، می تواند وعده ای برای سیگنال موفقیت یا عدم موفقیت را برگرداند. پارامتر reason حاوی یک DOMString است که توضیح می دهد چرا جریان سقط شده است.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

رابط WritableStreamDefaultController از جریان API نشان دهنده یک کنترلر است که امکان کنترل وضعیت WritableStream را در حین راه اندازی فراهم می کند ، زیرا بخش های بیشتری برای نوشتن یا در پایان نوشتن ارسال می شود. هنگام ساخت یک WritableStream ، به سینک زیرزمینی یک نمونه مربوط به WritableStreamDefaultController مربوطه برای دستکاری داده می شود. WritableStreamDefaultController تنها یک روش دارد: WritableStreamDefaultController.error() ، که باعث ایجاد تعامل آینده با جریان مرتبط با خطا می شود. WritableStreamDefaultController همچنین از یک ویژگی signal پشتیبانی می کند که نمونه ای از AbortSignal را برمی گرداند و اجازه می دهد در صورت لزوم یک عملیات WritableStream متوقف شود.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

استدلال دوم ، به همین ترتیب اختیاری ، سازنده WritableStream() queuingStrategy است. این شیء است که به طور اختیاری یک استراتژی صف را برای جریان تعریف می کند ، که دو پارامتر را می گیرد:

  • highWaterMark : یک عدد غیر منفی که نشانگر آب زیاد جریان با استفاده از این استراتژی صف بندی است.
  • size(chunk) : تابعی که اندازه غیر منفی محدود مقدار قطعه داده شده را محاسبه و باز می گرداند. نتیجه برای تعیین فشار خون ، از طریق خاصیت WritableStreamDefaultWriter.desiredSize تظاهرات استفاده می شود.

روشهای getWriter() و write()

برای نوشتن به یک جریان قابل نوشتن ، به یک نویسنده نیاز دارید که یک WritableStreamDefaultWriter خواهد بود. روش getWriter() رابط WritableStream نمونه جدیدی از WritableStreamDefaultWriter را برمی گرداند و جریان را به آن نمونه قفل می کند. در حالی که جریان قفل شده است ، تا زمان انتشار فعلی ، هیچ نویسنده دیگری قابل دستیابی نیست.

روش write() رابط WritableStreamDefaultWriter ، بخشی از داده های منتقل شده را به یک WritableStream و سینک زیرین آن می نویسد ، سپس قولی را برمی گرداند که مشخص می شود موفقیت یا عدم موفقیت در عملکرد نوشتن را نشان می دهد. توجه داشته باشید که معنای "موفقیت" به معنای سینک زیرین است. این ممکن است نشان دهد که این بخش پذیرفته شده است ، و نه لزوماً که با خیال راحت در مقصد نهایی خود ذخیره می شود.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

خاصیت locked

می توانید با دسترسی به ویژگی WritableStream.locked خود ، یک جریان نوشتاری را قفل کنید.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

نمونه کد جریان قابل نوشتن

نمونه کد زیر تمام مراحل عمل را نشان می دهد.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

لوله کشی یک جریان قابل خواندن به یک جریان قابل نوشتن

یک جریان قابل خواندن را می توان از طریق روش pipeTo() جریان قابل خواندن به یک جریان قابل خواندن منتقل کرد. ReadableStream.pipeTo() ReadableStream فعلی را به یک WritableStream داده شده لوله می کند و وعده ای را برمی گرداند که هنگام انجام روند لوله کشی با موفقیت انجام می شود ، یا در صورت بروز هرگونه خطا ، رد می شود.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

ایجاد یک جریان تبدیل

رابط TransformStream از API جریان ها مجموعه ای از داده های قابل تبدیل را نشان می دهد. شما با فراخوانی سازنده آن TransformStream() ، یک جریان تبدیل ایجاد می کنید ، که یک شیء Transform Stream را از دستگیرهای داده شده ایجاد و برمی گرداند. سازنده TransformStream() به عنوان اولین آرگومان خود ، یک شیء JavaScript اختیاری را نشان می دهد که transformer نشان می دهد. چنین اشیاء می توانند حاوی هر یک از روشهای زیر باشند:

transformer

  • start(controller) : این روش بلافاصله هنگام ساخت شیء نامیده می شود. به طور معمول از این برای استفاده از قطعات پیشوند استفاده می شود ، با استفاده از controller.enqueue() . این بخش ها از طرف قابل خواندن خوانده می شوند اما به هیچ نوشتن به طرف قابل نوشتن بستگی ندارند. اگر این فرآیند اولیه ناهمزمان باشد ، به عنوان مثال ، زیرا برای به دست آوردن قطعات پیشوند تلاش می کند ، این عملکرد می تواند وعده ای را برای سیگنال موفقیت یا عدم موفقیت بازگرداند. یک وعده رد شده خطای جریان را خط می کند. هر استثنائی پرتاب شده توسط سازنده TransformStream() دوباره پرتاب خواهد شد.
  • transform(chunk, controller) : این روش زمانی خوانده می شود که یک تکه جدید که در ابتدا برای طرف قابل نوشتن نوشته شده است ، آماده تبدیل شود. اجرای جریان تضمین می کند که این عملکرد فقط پس از موفقیت در تبدیل های قبلی فراخوانی خواهد شد و هرگز قبل از start() یا بعد از flush() فراخوانی نشده است. این عملکرد کار تحول واقعی جریان تبدیل را انجام می دهد. این می تواند نتایج را با استفاده از controller.enqueue() انجام دهد. این اجازه می دهد تا یک تکه واحد که به طرف قابل نوشتن نوشته شده است ، بسته به چند بار controller.enqueue() نامیده شود. اگر فرآیند تبدیل ناهمزمان باشد ، این عملکرد می تواند وعده ای را برای سیگنال موفقیت یا عدم موفقیت تحول بازگرداند. یک وعده رد شده هر دو طرف قابل خواندن و قابل نوشتن جریان تبدیل را خطا می کند. اگر روش transform() ارائه نشده باشد ، از تبدیل هویت استفاده می شود ، که تکه های بدون تغییر از سمت قابل استفاده به سمت خوانده شده را تغییر می دهد.
  • flush(controller) : این روش پس از گذشت همه تکه هایی که به طرف قابل نوشتن نوشته شده است ، با عبور موفقیت آمیز از طریق transform() تغییر یافته است ، و طرف قابل نوشتن در حال بسته شدن است. به طور معمول از این استفاده می شود تا تکه های پسوند را به طرف قابل خواندن انجام دهد ، قبل از آنکه نیز بسته شود. اگر فرآیند گرگرفتگی ناهمزمان باشد ، این عملکرد می تواند وعده موفقیت یا عدم موفقیت را نشان دهد. نتیجه به تماس گیرنده stream.writable.write() ابلاغ می شود. علاوه بر این ، یک وعده رد شده هر دو طرف قابل خواندن و قابل نوشتن جریان را خطا می کند. پرتاب یک استثنا همان با بازگشت یک وعده رد شده رفتار می شود.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

استراتژی های writableStrategy و readableStrategy

پارامترهای اختیاری دوم و سوم سازنده TransformStream() استراتژی های writableStrategy و readableStrategy اختیاری هستند. آنها به ترتیب در بخش های قابل خواندن و جریان قابل نوشتن تعریف شده اند.

تبدیل نمونه کد جریان

نمونه کد زیر یک جریان تبدیل ساده در عمل را نشان می دهد.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

لوله کشی یک جریان قابل خواندن از طریق یک جریان تبدیل

روش pipeThrough() رابط ReadableStream یک روش زنجیره ای برای لوله کشی جریان فعلی از طریق یک جریان تبدیل یا هر جفت قابل خواندن/قابل خواندن دیگر فراهم می کند. لوله کشی یک جریان به طور کلی آن را برای مدت زمان لوله قفل می کند و مانع از قفل شدن سایر خوانندگان می شود.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

نمونه کد بعدی (کمی متناقض) نشان می دهد که چگونه می توانید یک نسخه "فریاد" از fetch() را اجرا کنید که تمام متن را با استفاده از وعده پاسخ برگشتی به عنوان یک قطعه جریان و طبقه بندی شده توسط تکه ها ، متن می کند. مزیت این رویکرد این است که شما نیازی به انتظار برای بارگیری کل سند ندارید که می تواند هنگام برخورد با پرونده های بزرگ تفاوت زیادی ایجاد کند.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

نسخه ی نمایشی

نسخه ی نمایشی زیر جریان های قابل خواندن ، نوشتاری و تبدیل در عمل را نشان می دهد. همچنین شامل نمونه هایی از زنجیره های لوله pipeThrough() و pipeTo() است ، و همچنین tee() را نشان می دهد. می توانید به صورت اختیاری نسخه ی نمایشی را در پنجره خود اجرا کنید یا کد منبع را مشاهده کنید.

جریان های مفید موجود در مرورگر

تعدادی جریان مفید درست در مرورگر ساخته شده است. به راحتی می توانید یک ReadableStream از حباب ایجاد کنید. روش جریان رابط Blob () یک ReadableStream را برمی گرداند که پس از خواندن داده های موجود در حباب را برمی گرداند. همچنین به یاد بیاورید که یک شیء File نوع خاصی از Blob است و می تواند در هر زمینه ای که یک حباب می تواند استفاده شود.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

انواع جریان TextDecoder.decode() و TextEncoder.encode() به ترتیب TextDecoderStream و TextEncoderStream نامیده می شوند.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

CompressionStream سازی یا فشرده سازی یک پرونده به ترتیب با جریان های تبدیل و DecompressionStream به ترتیب آسان است. نمونه کد زیر نشان می دهد که چگونه می توانید مشخصات جریان ، فشرده سازی (GZIP) را درست در مرورگر بارگیری کنید و فایل فشرده شده را مستقیماً روی دیسک بنویسید.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API ' FileSystemWritableFileStream و جریان های آزمایشی fetch() نمونه هایی از جریان های قابل نوشتن در طبیعت است.

API سریال از هر دو جریان قابل خواندن و قابل خواندن استفاده می کند.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

سرانجام ، WebSocketStream API جریان را با API WebSocket ادغام می کند.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

منابع مفید

قدردانی ها

این مقاله توسط جیک Archibald ، François Beaufort ، Sam Dutton ، Mattias Buelens ، Surma ، Joe Medley و Adam Rice بررسی شده است. پست های وبلاگ جیک Archibald در درک جریان به من کمک زیادی کرده است. برخی از نمونه های کد با الهام از کاوش های کاربر GitHub BellBind و بخش هایی از نثر ساخته شده است که به شدت در اسناد وب MDN در جریان ها ساخته شده است. نویسندگان Streams Standard در نوشتن این مشخصات کار فوق العاده ای انجام داده اند. تصویر قهرمان توسط رایان لارا در Unsplash .